package controller

import (
	"context"

	"gitee.com/go-course/go9/projects/devcloud/maudit/apps/audit"
	"gitee.com/go-course/go9/projects/devcloud/maudit/common/logger"
	"github.com/infraboard/mcube/app"
	"github.com/segmentio/kafka-go"
)

func NewAuduitLogSaveConroller(
	brokerAddress []string,
	groupId,
	topic string,
) *AuduitLogSaveConroller {
	return &AuduitLogSaveConroller{
		brokerAddress: brokerAddress,
		groupId:       groupId,
		topic:         topic,
		audit:         app.GetInternalApp(audit.AppName).(audit.Service),
	}
}

// 审计日志保存
type AuduitLogSaveConroller struct {
	brokerAddress []string
	groupId       string
	topic         string

	r     *kafka.Reader
	audit audit.Service
}

// 需要读取Kafka里面的审计日志
// 消费kafka里面的audit log
func (c *AuduitLogSaveConroller) Run(ctx context.Context) {
	// make a new reader that consumes from topic-A
	c.r = kafka.NewReader(kafka.ReaderConfig{
		Brokers: c.brokerAddress,
		// Consumer Groups, 不指定就是普通的一个Consumer
		GroupID: c.groupId,
		// 可以指定Partition消费消息
		// Partition: 0,
		Topic:    c.topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})

	for {
		m, err := c.r.ReadMessage(ctx)
		if err != nil {
			break
		}
		logger.L().Debug().Msgf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
		ins, err := audit.LoadAuditLogFromJosn(m.Value)
		if err != nil {
			logger.L().Error().Msgf("load audit log error, %s", err)
			continue
		}
		_, err = c.audit.SaveAuditLog(ctx, ins)
		if err != nil {
			logger.L().Error().Msgf("save audit log error, %s", err)
			continue
		}
	}
}

func (c *AuduitLogSaveConroller) Stop() error {
	return c.r.Close()
}
